#include <algorithm>
#include <stdexcept>
#include <string>
#include <vector>

#include <realm/impl/simulated_failure.hpp>
#include <realm/string_data.hpp>
#include <realm/sync/client.hpp>
#include <realm/sync/network/default_socket.hpp>
#include <realm/sync/network/http.hpp>
#include <realm/sync/network/network.hpp>
#include <realm/sync/noinst/client_history_impl.hpp>
#include <realm/sync/noinst/protocol_codec.hpp>
#include <realm/sync/noinst/server/server.hpp>
#include <realm/sync/noinst/server/server_dir.hpp>
#include <realm/transaction.hpp>
#include <realm/version.hpp>

#include "test.hpp"

namespace realm::fixtures {

using namespace realm::sync;
using namespace realm::test_util;

// This public key must match the private key used to sign the token
// below (test.pem).
const char g_test_server_key_path[] = "test_pubkey.pem";

inline std::string test_server_key_path()
{
    return get_test_resource_path() + g_test_server_key_path;
}

// The Base64-encoded user token is generated by the following command:
//     cat test_token.json | base64
// The Base64-encoded signature is generated by the following command:
//     cat test_token.json | openssl dgst -sha256 -binary -sign test.pem | base64
// The two are concatenated with a ':'.
// This token does not contain a "path" field, and therefore grants access to
// all Realms.
const char g_signed_test_user_token[] = "ewogICAgImlkZW50aXR5IjogInRlc3QiLAogICAgImFjY2VzcyI6IFsiZG93bmxvYWQiLCAidXBs"
                                        "b2FkIl0sCiAgICAidGltZXN0YW1wIjogMTQ1NTUzMDYxNCwKICAgICJleHBpcmVzIjogbnVsbCwK"
                                        "ICAgICJhcHBfaWQiOiAiaW8ucmVhbG0uVGVzdCIKfQo="
                                        ":"
                                        "kPQwXUUFFVoDkmw02ouA1g7OlXcZ/IJPpqwJs9lIi1azpyuakBWgQ8VhnInCXh90CQXYhnteZlMw"
                                        "HYUZgt3/ED1jLu+nK2HlRKsmsOuAI20jMnHGGIZkql4/Ck9PEsvZ3huHGk5Jv9vpFp/dtnl1JXK2"
                                        "9XjdO8+1hU4boeJuKpTMDTPwGI9dxa8sTtvMMN7AVoPkKb1uqHZVsb5uRGE86Cyv58cvuj/EvZ1A"
                                        "yOCt5NGJwjTxydPgfX3QPcNMwDTHCRWYuoi2oTCINQHy8ebzXVLT1iy3adV4rM5bJukCnpLqHGlZ"
                                        "MIslk07zKdoj3igMIT47W9QwIuCw8x5f5cRIAg==";

const char g_unsigned_test_user_token[] =
    "ewogICAgImlkZW50aXR5IjogInRlc3QiLAogICAgImFjY2VzcyI6IFsiZG93bmxvYWQiLCAidXBs"
    "b2FkIl0sCiAgICAidGltZXN0YW1wIjogMTQ1NTUzMDYxNCwKICAgICJleHBpcmVzIjogbnVsbCwK"
    "ICAgICJhcHBfaWQiOiAiaW8ucmVhbG0uVGVzdCIKfQo=";


// Token for user_0.
//{
//    "identity": "user_0",
//    "admin": false,
//    "timestamp": 1455530614,
//    "expires": null,
//    "app_id": "io.realm.Test"
//}
const char g_user_0_token[] = "ewogICAgImlkZW50aXR5IjogInVzZXJfMCIsCiAgICAiYWRtaW4iOiBmYWxzZSwKICAgICJ0aW"
                              "1lc3RhbXAiOiAxNDU1NTMwNjE0LAogICAgImV4cGlyZXMiOiBudWxsLAogICAgImFwcF9pZCI6"
                              "ICJpby5yZWFsbS5UZXN0Igp9Cg=="
                              ":"
                              "YC3MVU7lmF6uRbd72nJ8AWBI9/BmdLTutlAMy7+tkh0X08do3SDi3Kq++93b2+FWG8IOrVQge+"
                              "gvSMbaBEYgmV0QD7OL9Y29kjk8Ty7bvdFd2KoeQRb9IfSJXS0yd9d4OI0K/TfmGMjCh3j2gWkp"
                              "V5PTWg/V+T5oRfXXwUMIkSCAeCnd23YTr3NucdvkTcyjz0isW+E4uUcHePQA3Qeq0+/qPFrv4a"
                              "LGagtbYYnx2rCHZ8zZ1khMpKu/chF1kOit+eZqYB4Jgui5gXK3iSIWCwBlMOhofsNA9h5qmIFp"
                              "2SWQOk4s4bi962vJxIerIbYrsX4yzviz+yDX0UZRAStIjw==";


// Token for user_1.
// {
//    "identity": "user_1",
//    "admin": false,
//    "timestamp": 1455530614,
//    "expires": null,
//    "app_id": "io.realm.Test"
//}
const char g_user_1_token[] = "ewogICAgImlkZW50aXR5IjogInVzZXJfMSIsCiAgICAiYWRtaW4iOiBmYWxzZSwKICAgICJ0aW"
                              "1lc3RhbXAiOiAxNDU1NTMwNjE0LAogICAgImV4cGlyZXMiOiBudWxsLAogICAgImFwcF9pZCI6"
                              "ICJpby5yZWFsbS5UZXN0Igp9Cg=="
                              ":"
                              "iqFfuBuUx2i8nogJ2+ixA/vVUpAGaELIWktDlFyF5ZoE5xo+Jd+ElpK8Kiq7PQMd3ZwxwBBhMZ"
                              "M+PkgfLFcTA4hutZ5aCkbzB+DIuUCR7z0iDq4+rutIyQdvd9ujCOWYQXeE0F5TtSVWqf8baDG/"
                              "HXDGx6sASy+2PwUoZrllJbRPeclGQAsKZ6jq46ahH8mmvjKX0GCJHED4i20ZEr2VgeTPZ+9gYo"
                              "okj3RIbEzHF3SNLvae1ENY8O/mB5g+Hf71E6fnU/AuFTpbJlom0Lm72C7wgk88HjIqjfpo0L4l"
                              "9r54Q2rkIffdAbMRCrIqrT81AMZuTIGDkRbMLDLtsUifgQ==";


// Token for user_2.
// {
//    "identity": "user_2",
//    "admin": false,
//    "timestamp": 1455530614,
//    "expires": null,
//    "app_id": "io.realm.Test"
// }
const char g_user_2_token[] = "ewogICAgImlkZW50aXR5IjogInVzZXJfMiIsCiAgICAiYWRtaW4iOiBmYWxzZSwKICAgICJ0aW"
                              "1lc3RhbXAiOiAxNDU1NTMwNjE0LAogICAgImV4cGlyZXMiOiBudWxsLAogICAgImFwcF9pZCI6"
                              "ICJpby5yZWFsbS5UZXN0Igp9Cg=="
                              ":"
                              "q68rj8I66E10+EQ8+OuaSeD2U1zu1mWpRgLpt+fAX4JPYZssfUsOq7adY2IJRcBEYAwS4LjkgM"
                              "QGU9hwQ0PTLZzoYQQXqGfqsNHF/LvNv1P1DwGkf6fp5jsRcx+IauhmKiCKdg+SItaTuaZ1Duxs"
                              "9sMrL7NIa59fQDtefltNlbNBDMJDUFlNRdSQj4FEdhAkeNu9Qv2xMqDHKhcAZhvBcPxMYVno9E"
                              "69lEkxM0mIfoy5IFg81YRaB1gbSkhPs2HVo++j1jbtFRIv/cEA+PqeeHN+fDiMHQRRNtYRbJPh"
                              "hExS3rsZhsQSZPiU0urVw4fBFn9X5NuHuUJQO7vSdqJbwQ==";


// {
//     "identity": "user_0",
//     "admin": false,
//     "timestamp": 1455530614,
//     "expires": null,
//     "app_id": "io.realm.Test",
//     "path": "/test",
//     "access": ["download", "upload", "manage"]
// }
const char g_user_0_path_test_token[] = "ewogICAgImlkZW50aXR5IjogInVzZXJfMCIsCiAgICAiYWRtaW4iOiBmYWxzZSwKICAgICJ0aW1l"
                                        "c3RhbXAiOiAxNDU1NTMwNjE0LAogICAgImV4cGlyZXMiOiBudWxsLAogICAgImFwcF9pZCI6ICJp"
                                        "by5yZWFsbS5UZXN0IiwKICAgICJwYXRoIjogIi90ZXN0IiwKICAgICJhY2Nlc3MiOiBbImRvd25s"
                                        "b2FkIiwgInVwbG9hZCIsICJtYW5hZ2UiXQp9Cg=="
                                        ":"
                                        "E3hiuWjFFDUExrz5osiXFoDVSoX0168kdBNVBSEre/mYbr1s0A+mlvEH24ibklC8bh9K3BBtwjVQ"
                                        "v+vvFGThmZggbWDl/SAeeIP0MpQJR4FFAqkiB93Ax1Gi9b1i25lD5lGXc9CVsSNpWMV1LRD9I6Y9"
                                        "N70ENUd7vBuz0y4+y6k0A1HtaB6pzVx90kTtFaOVCz/UiMxsZOMKdRoNibIpmFU5Q5eP4UTsrZq/"
                                        "4rVOisRWno3eA01IQvz/ECtnt13KBc6rEzRzpA5tWMmwJpoykEpAUUFnx8N6LAqObeij/3c5iLwp"
                                        "1l5pFAzmTmbqeWbeCtLPyLM+baEo65kbPP5nPg==";


// {
//     "identity": "user_1",
//     "admin": false,
//     "timestamp": 1455530614,
//     "expires": null,
//     "app_id": "io.realm.Test",
//     "path": "/test",
//     "access": ["download", "upload", "manage"]
// }
const char g_user_1_path_test_token[] = "ewogICAgImlkZW50aXR5IjogInVzZXJfMSIsCiAgICAiYWRtaW4iOiBmYWxzZSwKICAgICJ0aW1l"
                                        "c3RhbXAiOiAxNDU1NTMwNjE0LAogICAgImV4cGlyZXMiOiBudWxsLAogICAgImFwcF9pZCI6ICJp"
                                        "by5yZWFsbS5UZXN0IiwKICAgICJwYXRoIjogIi90ZXN0IiwKICAgICJhY2Nlc3MiOiBbImRvd25s"
                                        "b2FkIiwgInVwbG9hZCIsICJtYW5hZ2UiXQp9Cg=="
                                        ":"
                                        "3QsZ3qKFwtf8FisKy3vrOImtFt7gOnuKQl92/Ckmjq6ux9OFG74d6sj6UrvXPce4lgeqPbT4qTNd"
                                        "WgrXh8vBqS/R34a9v2dhV5Ys9xkQuEp04+FMswwx3hFbsxNOPaMVwtFHDzEgQus3ZP5rnpFUj/xZ"
                                        "Rh6IKIQ5EksIZTGnQscBdSM/n1Qlpa7SPx94SrtIwZvPO+OcqJj367PdnS+Ii0TvGj6WFIz44gJT"
                                        "GX03+qMqSdAuW/91xX013efiG+nYKKPMT4Z6+pQkyJ9C9eyvXXiJigXlBXb8wxrWBzpGoaPcWYYj"
                                        "q9wI3gXQ9i7DT+cH7gbDFWnweiorMvmAPkocSw==";


// Generated from test_token_readonly.json
// This token does not contain a "path" field, and therefore grants access to
// all Realms.
const char g_signed_test_user_token_readonly[] =
    "ewogICAgImlkZW50aXR5IjogInRlc3QiLAogICAgImFjY2VzcyI6IFsiZG93bmxvYWQiXSwKICAg"
    "ICJ0aW1lc3RhbXAiOiAxNDU1NTMwNjE0LAogICAgImV4cGlyZXMiOiBudWxsLAogICAgImFwcF9p"
    "ZCI6ICJpby5yZWFsbS5UZXN0Igp9Cg=="
    ":"
    "e8VAY6/GO+JVi4D+inmKpPc0rgegKGlQ5gT9mpC+4DdWawiDZIyIyu6OfeNbDXYlSSryzQWJQ7zt"
    "ro/czrz0Q8bHVrUwzwI5jwogOPU4X64FthJ4LwPeP3DYKP3oeaZfn0m3ONQCcqbAjSXo/uMxgfRi"
    "ydYOWK6Vuoxxr8M7om7Y9nbdWp/ElNpYW2vbxZe87CDHt5WyV7qR8WH+xroNxMcngRd8lNquPW4g"
    "kSv8TltuIK/RY1Fwz0duhduqPDhmXL0tB/BiFytwZw4g4+Ag/N//3oUA+yMJl0zFvxnI5eRZBmqK"
    "5m6h1of9T5WRA2lW4X5HSo/Gi9h7cMCz8Nhpew==";

// Generated from test_token_for_path.json
// This token only grants access to the server path "/valid".
const char g_signed_test_user_token_for_path[] =
    "ewogICAgImlkZW50aXR5IjogInRlc3QiLAogICAgImFjY2VzcyI6IFsiZG93bmxvYWQiLCAidXBs"
    "b2FkIl0sCiAgICAidGltZXN0YW1wIjogMTQ1NTUzMDYxNCwKICAgICJleHBpcmVzIjogbnVsbCwK"
    "ICAgICJhcHBfaWQiOiAiaW8ucmVhbG0uVGVzdCIsCiAgICAicGF0aCI6ICIvdmFsaWQiCn0K"
    ":"
    "U4mUfQuK2qA/uFIKXS1Sjx8PFmaR0P8FTB3wu5ybldWJAKcVeiOInW23ZBFqdHvG//VO7eb3QgeR"
    "4C99I09O6CZg/zP0FM4sdhYr4bQXo2y8nbJy4sxLr0EYLxpcrDCoXPmQr7LD+txmzcc4/rHnZfiA"
    "0Ujai7I66mBejZyjwNGIqZ2iShX7NQJE9MxA6vvgCtsUCdJJjQFCeT46+V2g2ggU9jjX/lVFlJF+"
    "si9NGnisRqiK9loHlU/duhB9C2dai2If+1ZxW5xlrYSuLQhubwNcL9jRwB2GpCaZBnZBBxa0g4ZG"
    "a2wKgKA7oLGQZU7+OPSu3mwsGDnuuHYNoZ9KiA==";

// Generated from test_token_expiration_unspecified.json
const char g_signed_test_user_token_expiration_unspecified[] =
    "ewogICAgImlkZW50aXR5IjogInRlc3QiLAogICAgImFjY2VzcyI6IFsiZG93bmxvYWQiLCAidXBs"
    "b2FkIl0sCiAgICAidGltZXN0YW1wIjogMTQ1NTUzMDYxNCwKICAgICJhcHBfaWQiOiAiaW8ucmVh"
    "bG0uVGVzdCIKfQo="
    ":"
    "POoY6mR2LuEMmZ2nmzpx33QbaG213ZeP51pBihqFhosTWg/xszncQf1rCntY6tbnw7qNL1Sj8v/T"
    "tGjpGayGGgaHWRIOVw8X+Oije7YI5zRTWyCQBO4S3rpgiqxgnuqFVwfflfa++CslaVDy8TZOLwJp"
    "07x/57tjm0qVooDklA+IuQvWWCgwwaFgP1KRCXQ1UjFED3H5hClfZA4yT082BEUlCK+TbxMD4DjZ"
    "MY4hApqs19sTw04/EdL9Mw7DqXYyGZYrjd7hMwYxsaIOLaiuzS8t2yXElvj79q9hnjtxhYW7Fgeh"
    "52l91uWGnlj0EkGTdNLrH6EWQh9tEFFQxdf/1w==";

// Generated from test_token_expiration_null.json
const char g_signed_test_user_token_expiration_null[] =
    "ewogICAgImlkZW50aXR5IjogInRlc3QiLAogICAgImFjY2VzcyI6IFsiZG93bmxvYWQiLCAidXBs"
    "b2FkIl0sCiAgICAidGltZXN0YW1wIjogMTQ1NTUzMDYxNCwKICAgICJleHBpcmVzIjogbnVsbCwK"
    "ICAgICJhcHBfaWQiOiAiaW8ucmVhbG0uVGVzdCIKfQo="
    ":"
    "kPQwXUUFFVoDkmw02ouA1g7OlXcZ/IJPpqwJs9lIi1azpyuakBWgQ8VhnInCXh90CQXYhnteZlMw"
    "HYUZgt3/ED1jLu+nK2HlRKsmsOuAI20jMnHGGIZkql4/Ck9PEsvZ3huHGk5Jv9vpFp/dtnl1JXK2"
    "9XjdO8+1hU4boeJuKpTMDTPwGI9dxa8sTtvMMN7AVoPkKb1uqHZVsb5uRGE86Cyv58cvuj/EvZ1A"
    "yOCt5NGJwjTxydPgfX3QPcNMwDTHCRWYuoi2oTCINQHy8ebzXVLT1iy3adV4rM5bJukCnpLqHGlZ"
    "MIslk07zKdoj3igMIT47W9QwIuCw8x5f5cRIAg==";

// Generated from test_token_expiration_specified.json
const char g_signed_test_user_token_expiration_specified[] =
    "ewogICAgImlkZW50aXR5IjogInRlc3QiLAogICAgImFjY2VzcyI6IFsiZG93bmxvYWQiLCAidXBs"
    "b2FkIl0sCiAgICAidGltZXN0YW1wIjogMTQ1NTUzMDYxNCwKICAgICJleHBpcmVzIjogICAzMDAw"
    "MDAwMDAwLAogICAgImFwcF9pZCI6ICJpby5yZWFsbS5UZXN0Igp9Cg=="
    ":"
    "Y82y0/oFnVGDcQAHIw59QVMnK9ji9byj6e3h1kwMwZeCDq2aQZvGhHsAfHjfFrJe7VfBFhW7o3Op"
    "JiSG3X8QAQwosUJMLLNQBxCbF+FOE5p2lD8ET0huDsafBKSszm4YrUrpgC6KkOlQqsaCH8bHhYqk"
    "YhQzyBEK2hc9hwuzic2RSLtpEttvz1Ew/QKlMqB9TVRpINdHaPfzh0EcWq72yza/Q+JWnLy+D24F"
    "2Thv0yKASw40Do56N8yoE/3gOPbrUrdmtjGClVbbR/lQ//z244qmik+mtEBuemYDTzO9/KrmBkJP"
    "YI2MJlgWBspWqWv9JxslYzh/gfD2uoyo1mJ2aQ==";

// Generated from test_token_sync_label_default.json
const char g_signed_test_user_token_sync_label_default[] =
    "ewogICAgImlkZW50aXR5IjogInRlc3QiLAogICAgImFjY2VzcyI6IFsiZG93bmxvYWQiLCAidXBs"
    "b2FkIl0sCiAgICAidGltZXN0YW1wIjogMTQ1NTUzMDYxNCwKICAgICJleHBpcmVzIjogbnVsbCwK"
    "ICAgICJhcHBfaWQiOiAiaW8ucmVhbG0uVGVzdCIsCiAgICAic3luY19sYWJlbCI6ICJkZWZhdWx0"
    "Igp9Cg=="
    ":"
    "myXAxeVDbu+RhbcrCHPCll0xExQOqIj9TlZ9RXlRQGhfcRvAqVgO4FZQLl+qp+DGICrG7I5OYXXA"
    "K+cfhAJ5G/xQxdQdUuuYfvzE3dg6xuNkL7a41/SXcnVu1U49mGtZF+55+S2DpEmI4TfVlNdTLEci"
    "mizXRZYeq8OdgI2kBGisKw7wSVCaeJ7pV5gzs8dRbZg6OF3OkCPJiWPxvCWEUhIVEn49pNQ0Q2E6"
    "F9x2Ckba7LMUY2VupahVir/+4u4Y9hbwUJ3fEnKkzxUErh9Gao4+0jLK042y5+cfJLhnzGHOVIXt"
    "2/PkHx3oDpcl5O9Gc1qAlt7O6lN8qOXS4p1yIQ==";

// Generated from test_token_sync_label_custom.json
const char g_signed_test_user_token_sync_label_custom[] =
    "ewogICAgImlkZW50aXR5IjogInRlc3QiLAogICAgImFjY2VzcyI6IFsiZG93bmxvYWQiLCAidXBs"
    "b2FkIl0sCiAgICAidGltZXN0YW1wIjogMTQ1NTUzMDYxNCwKICAgICJleHBpcmVzIjogbnVsbCwK"
    "ICAgICJhcHBfaWQiOiAiaW8ucmVhbG0uVGVzdCIsCiAgICAic3luY19sYWJlbCI6ICJjdXN0b20i"
    "Cn0K"
    ":"
    "T4Ajej3JD3wRjbcPVXWdPzgPngf0aCIDN6JiFKkvbSyh5cDOibx6XzUVQrjHGDI/IU1VT2Hs+V0B"
    "KVHKVKVJV22zaCOdIGOQ4QtOh9InnqAGghz+8IJHZawZIqdPByy4WtsPX6EITU2yYUfZ0YIzVMmD"
    "f5bB5JY532vifQsc0rpJaFztKekQCx7j02opvNeyXg6jEoFwY62uIGbS83FnLkVOE0uZ8XK6JuWK"
    "69RzxOGLyh97goWFtx+8Xp+fz82slRCxlFas8SIkxdj0vfIHcr8BnhhDgbsXDBPvbW55fDnwTlzA"
    "1TvQPRWX12PXn0jJKSTFyC73RO26oorfQewTeA==";

const std::string developer_token = "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE0OTI2MTEzMDN9.cf2s0bUM/da"
                                    "Hq/yp7PG6csktIgObeVeUvmObD8JOUCwC2uA3rMLGjEHPKxxmvZJtSoQq49PkhTpvCSk1ewKR"
                                    "R67U9J/AE1czBQPiit4FgTj/oujXIpNVio9t5oeSd3XqNg66HZhi5F+wsMOJ2hmxL9S+OBjQU"
                                    "yUchEPksKubiFKUPEktdmjewNp2VmdnPNjAdqmIhRyeHSQhl494lOlK/gyae2RUh2wWO1j9K0"
                                    "o24nd0VIr4+61UD/aMDsF70vXEkqPxboB83yu51CpkmqFJ0xklA9f9M1uKUgsOwX07Uf4plRE"
                                    "tYb1QxmoPdnQH3VPvVt4i0Yko/dMgpnuSJj30gQ==";

const std::string enterprise_token = "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJBZGFwdGVyIjp0cnVlLCJCYWNrdXAiOnRyd"
                                     "WUsIkxvYWRCYWxhbmNpbmciOnRydWUsIk5vdGlmaWVyIjp0cnVlLCJTeW5jIjp0cnVlLCJpYXQ"
                                     "iOjE1MDc4MzM4MDJ9.SJn2ScgPXXJgRrXzKg-7HlCcwGQ5pQVLPXHiyJhRN54f4dAFsVDjyoqf"
                                     "yV2TO6CibP9UytIaztnnqjFrYq47YsZas9IlkZ6xv3wZRUF4Op_OcCdmuY9kAuGG-fNqBKWaM_"
                                     "woDzrFBigo1os7_FqFU1aC1Jmnh1NZjEPTP5euwWNF92BGcfAdHBkJU86MLvvXLHzGnedkm4Y5"
                                     "Sh5GlHz_UkC7OHflJoXbMUIUi9NMbqH3BbtvCwKHE1Vs-XShNFqbrAJTG6PJxJxIOXfSvgbnxe"
                                     "N3gpm69p-9cfM9O4TjNPoYZy4cPR5UG6NSJ4XDuSg7Wz8kdn9l3IzMiwi-a6Oofw";

const StringData g_partial_sync_result_sets_table_name = "class___ResultSets";


class FakeClock : public sync::Clock {
public:
    template <class R, class P>
    void add_time(std::chrono::duration<R, P> duration) noexcept
    {
        m_now += std::chrono::duration_cast<time_point::duration>(duration).count();
    }

    FakeClock() noexcept
        : m_now(0)
    {
    }

    template <class R, class P>
    FakeClock(std::chrono::duration<R, P> initial_value) noexcept
        : m_now(std::chrono::duration_cast<time_point::duration>(initial_value).count())
    {
    }

    time_point now() const noexcept override
    {
        return time_point{time_point::duration{m_now}};
    }

private:
    std::atomic<time_point::rep> m_now;
};


// HTTPRequestClient is used to send a HTTP request to a server, typically the
// Sync server, and wait for the response. HTTPRequestClient is constructed
// with the server endpoint and request. fetch_response() is a blocking
// function that fetches the HTTP response on the network. get_response()
// is used to retrieve the result afterwards.
class HTTPRequestClient {
public:
    using WriteCompletionHandler = util::UniqueFunction<void(std::error_code, size_t num_bytes_transferred)>;
    using ReadCompletionHandler = util::UniqueFunction<void(std::error_code, size_t num_bytes_transferred)>;

    util::PrefixLogger logger;

    HTTPRequestClient(const std::shared_ptr<util::Logger>& logger_ptr, const network::Endpoint& endpoint,
                      const HTTPRequest& request)
        : logger{"HTTP client: ", logger_ptr}
        , m_endpoint{endpoint}
        , m_http_client{*this, logger_ptr}
        , m_request{request}
    {
    }

    // The network activity happens in fetch_response().
    // fetch_response() is blocking.
    void fetch_response()
    {
        initiate_tcp_connect();
        m_service.run();
    }

    // get_response is used to retrieve the response after
    // fetch_response returns.
    HTTPResponse& get_response()
    {
        return m_response;
    }

    void async_write(const char* data, size_t size, WriteCompletionHandler handler)
    {
        m_socket.async_write(data, size, std::move(handler));
    }

    void async_read_until(char* buffer, size_t size, char delim, ReadCompletionHandler handler)
    {
        m_socket.async_read_until(buffer, size, delim, m_read_ahead_buffer, std::move(handler));
    }

    void async_read(char* buffer, size_t size, ReadCompletionHandler handler)
    {
        m_socket.async_read(buffer, size, m_read_ahead_buffer, std::move(handler));
    }

private:
    network::Service m_service;
    network::Socket m_socket{m_service};
    network::ReadAheadBuffer m_read_ahead_buffer;
    network::Endpoint m_endpoint;
    HTTPClient<HTTPRequestClient> m_http_client;
    HTTPRequest m_request;
    HTTPResponse m_response;

    void initiate_tcp_connect()
    {
        auto handler = [this](std::error_code ec) mutable {
            if (ec != util::error::operation_aborted)
                handle_tcp_connect(ec);
        };
        m_socket.async_connect(m_endpoint, std::move(handler));
        logger.info("Connecting to endpoint '%1:%2'", m_endpoint.address(), m_endpoint.port());
    }

    void handle_tcp_connect(std::error_code ec)
    {
        if (ec) {
            logger.debug("Failed to connect to endpoint '%1:%2': %3", m_endpoint.address(), m_endpoint.port(),
                         ec.message());
            stop();
            return;
        }

        m_socket.set_option(network::SocketBase::no_delay(true));
        logger.debug("Connected to endpoint '%1:%2'", m_endpoint.address(), m_endpoint.port());


        initiate_http_request();
    }

    void initiate_http_request()
    {
        auto handler = [this](HTTPResponse response, std::error_code ec) {
            if (ec != util::error::operation_aborted) {
                if (ec) {
                    logger.debug("HTTP response error, ec = %1", ec.message());
                    stop();
                    return;
                }
                handle_http_response(response);
            }
        };
        m_http_client.async_request(m_request, handler);
    }

    void handle_http_response(const HTTPResponse response)
    {
        logger.debug("HTTP response received, status = %1", int(response.status));
        m_response = response;
        stop();
    }

    void stop()
    {
        m_socket.close();
        m_service.stop();
    }
};


class MultiClientServerFixture {
public:
    enum class ClusterTopology { separate_nodes, two_tiers, one_node_per_tier };

    struct Config {
        Config() {}

        // Optional thread-safe logger. If none is specified, the one available
        // through unit_test::TestContext will be used.
        std::shared_ptr<util::Logger> logger;

        // These values will disable the heartbeats by default.
        milliseconds_type client_ping_period = 100000000;  // do not send pings
        milliseconds_type client_pong_timeout = 100000000; // do not expect pongs
        milliseconds_type server_connection_reaper_timeout = 100000000;
        milliseconds_type server_connection_reaper_interval = 100000000;

        long server_max_open_files = 64;

        bool enable_server_ssl = false;

        std::string server_ssl_certificate_path = get_test_resource_path() + "test_sync_ca.pem";
        std::string server_ssl_certificate_key_path = get_test_resource_path() + "test_sync_key.pem";

        bool disable_upload_compaction = false;

        bool disable_history_compaction = false;
        std::chrono::seconds history_ttl = std::chrono::seconds::max();
        std::chrono::seconds history_compaction_interval = std::chrono::seconds{3600};
        const Clock* history_compaction_clock = nullptr;

        size_t max_download_size = 0x1000000; // 16 MB as in Server::Config

#if REALM_DISABLE_SYNC_MULTIPLEXING
        bool one_connection_per_session = true;
#else
        bool one_connection_per_session = false;
#endif

        bool disable_upload_activation_delay = false;

        ClusterTopology cluster_topology = ClusterTopology::separate_nodes;

        std::string authorization_header_name = "Authorization";

        // Run servers without public key if `server_public_key_path` is the
        // empty string.
        std::string server_public_key_path = test_server_key_path();

        std::optional<std::array<char, 64>> server_encryption_key;

        int server_max_protocol_version = 0;

        std::set<file_ident_type> server_disable_download_for;

        std::function<Server::SessionBootstrapCallback> server_session_bootstrap_callback;

        std::shared_ptr<BindingCallbackThreadObserver> socket_provider_observer;
    };


    MultiClientServerFixture(int num_clients, int num_servers, std::string server_dir,
                             unit_test::TestContext& test_context, Config config = {})
        : m_logger{config.logger ? config.logger : test_context.logger}
        , m_num_servers{num_servers}
        , m_num_clients{num_clients}
        , m_enable_server_ssl{config.enable_server_ssl}
        , m_test_context{test_context}
    {
        REALM_ASSERT(num_servers >= 1);

        m_server_loggers.resize(num_servers);
        m_client_loggers.resize(num_clients);

        if (num_servers == 1) {
            m_server_loggers[0] = std::make_shared<util::PrefixLogger>("Server: ", m_logger);
        }
        else {
            for (int i = 0; i < num_servers; ++i) {
                std::string prefix = "Server[" + std::to_string(i + 1) + "]: ";
                m_server_loggers[i] = std::make_shared<util::PrefixLogger>(std::move(prefix), m_logger);
            }
        }

        if (num_clients == 1) {
            m_client_loggers[0] = std::make_shared<util::PrefixLogger>("Client: ", m_logger);
        }
        else {
            for (int i = 0; i < num_clients; ++i) {
                std::string prefix = "Client[" + std::to_string(i + 1) + "]: ";
                m_client_loggers[i] = std::make_shared<util::PrefixLogger>(std::move(prefix), m_logger);
            }
        }

        m_servers.resize(num_servers);
        m_server_ports.resize(num_servers);
        std::string listen_address = "localhost";
        std::string listen_port = ""; // Assign automatically
        for (int i = 0; i < num_servers; ++i) {
            std::string dir_name = "server";
            if (num_servers > 1) {
                dir_name += "-" + std::to_string(i + 1);
            }
            std::string dir = util::File::resolve(dir_name, server_dir);
            util::try_make_dir(dir);
            std::optional<PKey> public_key;
            if (!config.server_public_key_path.empty())
                public_key = PKey::load_public(config.server_public_key_path);
            Server::Config config_2;
            config_2.max_open_files = config.server_max_open_files;
            config_2.logger = m_server_loggers[i];
            config_2.token_expiration_clock = &m_fake_token_expiration_clock;
            config_2.ssl = m_enable_server_ssl;
            config_2.ssl_certificate_path = config.server_ssl_certificate_path;
            config_2.ssl_certificate_key_path = config.server_ssl_certificate_key_path;
            config_2.connection_reaper_timeout = config.server_connection_reaper_timeout;
            config_2.connection_reaper_interval = config.server_connection_reaper_interval;
            config_2.max_download_size = config.max_download_size;
            config_2.tcp_no_delay = true;
            config_2.authorization_header_name = config.authorization_header_name;
            config_2.encryption_key = config.server_encryption_key;
            config_2.max_protocol_version = config.server_max_protocol_version;
            config_2.disable_download_for = std::move(config.server_disable_download_for);
            config_2.session_bootstrap_callback = std::move(config.server_session_bootstrap_callback);
            m_servers[i] = std::make_unique<Server>(std::move(dir), std::move(public_key), std::move(config_2));
            m_servers[i]->start(listen_address, listen_port);
            m_server_ports[i] = m_servers[i]->listen_endpoint().port();
        }

        m_clients.resize(num_clients);
        for (int i = 0; i < num_clients; ++i) {
            Client::Config config_2;

            m_client_socket_providers.push_back(std::make_shared<websocket::DefaultSocketProvider>(
                m_client_loggers[i], "", config.socket_provider_observer,
                websocket::DefaultSocketProvider::AutoStart{false}));
            config_2.socket_provider = m_client_socket_providers.back();
            config_2.logger = m_client_loggers[i];
            config_2.reconnect_mode = ReconnectMode::testing;
            config_2.ping_keepalive_period = config.client_ping_period;
            config_2.pong_keepalive_timeout = config.client_pong_timeout;
            config_2.one_connection_per_session = config.one_connection_per_session;
            config_2.disable_upload_activation_delay = config.disable_upload_activation_delay;
            config_2.fix_up_object_ids = true;
            m_clients[i] = std::make_unique<Client>(std::move(config_2));
        }

        m_server_threads.resize(num_servers);

        m_simulated_server_error_rates.resize(num_servers);
        m_simulated_client_error_rates.resize(num_clients);

        m_allow_server_errors.resize(num_servers, 0);

        m_connection_state_change_listeners.resize(num_clients);
    }

    MultiClientServerFixture(const MultiClientServerFixture&) = delete;

    ~MultiClientServerFixture()
    {
        unit_test::TestContext& test_context = m_test_context;
        stop();
        for (int i = 0; i < m_num_clients; ++i) {
            m_clients[i]->shutdown_and_wait();
        }
        m_client_socket_providers.clear();
        for (int i = 0; i < m_num_servers; ++i) {
            if (m_server_threads[i].joinable())
                CHECK(!m_server_threads[i].join());
            CHECK_LESS_EQUAL(m_servers[i]->errors_seen(), m_allow_server_errors[i]);
        }
    }

    using ErrorHandler = void(Status status, bool is_fatal);

    // Set an error handler to be used for all sessions of the specified client
    // (\a handler will be copied for each session). Must be called before
    // make_session().
    void set_client_side_error_handler(int client_index, std::function<ErrorHandler> handler)
    {
        auto handler_wrapped = [handler = std::move(handler)](ConnectionState state,
                                                              std::optional<SessionErrorInfo> error_info) {
            if (state != ConnectionState::disconnected)
                return;
            REALM_ASSERT(error_info);
            handler(error_info->status, error_info->is_fatal);
        };
        m_connection_state_change_listeners[client_index] = std::move(handler_wrapped);
    }

    void set_client_side_error_rate(int client_index, int n, int m)
    {
        REALM_ASSERT(client_index >= 0 && client_index < m_num_clients);
        auto sim = std::make_pair(n, m);
        // Save the simulated error rate
        m_simulated_client_error_rates[client_index] = sim;

        // Post the new simulated error rate
        using sf = _impl::SimulatedFailure;
        // Post it onto the event loop to update the event loop thread
        m_client_socket_providers[client_index]->post([sim = std::move(sim)](Status) {
            sf::prime_random(sf::sync_client__read_head, sim.first, sim.second,
                             random_int<uint_fast64_t>()); // Seed from global generator
        });
    }

    // Must be called before start().
    void set_server_side_error_rate(int server_index, int n, int m)
    {
        REALM_ASSERT(server_index >= 0 && server_index < m_num_servers);
        m_simulated_server_error_rates[server_index] = std::make_pair(n, m);
    }

    void start()
    {
        for (int i = 0; i < m_num_servers; ++i)
            m_server_threads[i].start([this, i] {
                run_server(i);
            });

        for (int i = 0; i < m_num_clients; ++i) {
            m_client_socket_providers[i]->start();
        }
    }

    void start_client(int index)
    {
        REALM_ASSERT(index >= 0 && index < m_num_clients);
        m_client_socket_providers[index]->start();
    }

    // Use either the methods below or `start()`.
    void start_server(int index)
    {
        REALM_ASSERT(index >= 0 && index < m_num_servers);
        m_server_threads[index].start([this, index] {
            run_server(index);
        });
    }

    void stop_server(int index)
    {
        REALM_ASSERT(index >= 0 && index < m_num_servers);
        m_servers[index]->stop();
        unit_test::TestContext& test_context = m_test_context;
        if (m_server_threads[index].joinable())
            CHECK(!m_server_threads[index].join());
        CHECK_LESS_EQUAL(m_servers[index]->errors_seen(), m_allow_server_errors[index]);
    }

    void stop_client(int index)
    {
        REALM_ASSERT(index >= 0 && index < m_num_clients);
        auto& client = get_client(index);
        auto sim = m_simulated_client_error_rates[index];
        if (sim.first != 0) {
            using sf = _impl::SimulatedFailure;
            // If we're using a simulated failure, clear it by posting onto the event loop
            m_client_socket_providers[index]->post([](Status) mutable {
                sf::unprime(sf::sync_client__read_head); // Clear the sim failure set when started
            });
        }
        // We can't wait for clearing the simulated failure since some tests stop the client early
        client.shutdown_and_wait();
    }

    void stop()
    {
        for (int i = 0; i < m_num_clients; ++i)
            m_clients[i]->shutdown();
        for (int i = 0; i < m_num_servers; ++i)
            m_servers[i]->stop();
    }

    Client& get_client(int client_index) noexcept
    {
        return *m_clients[client_index];
    }

    Server& get_server(int server_index) noexcept
    {
        return *m_servers[server_index];
    }

    Session make_session(int client_index, int server_index, DBRef db, std::string realm_identifier,
                         Session::Config config = {})
    {
        //  *ClientServerFixture uses the service identifier "/realm-sync" to distinguish Sync
        //  connections, while BaaS does not.
        config.service_identifier = "/realm-sync";
        config.realm_identifier = std::move(realm_identifier);
        config.server_port = m_server_ports[server_index];
        config.server_address = "localhost";
        if (m_connection_state_change_listeners[client_index]) {
            config.connection_state_change_listener = m_connection_state_change_listeners[client_index];
        }
        else if (!config.connection_state_change_listener) {
            auto fallback_listener = [this](ConnectionState state, std::optional<SessionErrorInfo> error) {
                if (state != ConnectionState::disconnected)
                    return;
                REALM_ASSERT(error);
                unit_test::TestContext& test_context = m_test_context;
                test_context.logger->error("Client disconnect: %1 (is_fatal=%2)", error->status, error->is_fatal);
                bool client_error_occurred = true;
                CHECK_NOT(client_error_occurred);
                stop();
            };
            config.connection_state_change_listener = fallback_listener;
        }

        return Session{*m_clients[client_index], std::move(db), nullptr, nullptr, std::move(config)};
    }

    Session make_bound_session(int client_index, DBRef db, int server_index, std::string server_path,
                               Session::Config config = {})
    {
        return make_bound_session(client_index, std::move(db), server_index, std::move(server_path),
                                  g_signed_test_user_token, std::move(config));
    }

    Session make_bound_session(int client_index, DBRef db, int server_index, std::string server_path,
                               std::string signed_user_token, Session::Config config = {})
    {
        config.signed_user_token = std::move(signed_user_token);
        return make_session(client_index, server_index, std::move(db), std::move(server_path), std::move(config));
    }

    void cancel_reconnect_delay(int client_index)
    {
        get_client(client_index).cancel_reconnect_delay();
    }

    void allow_server_errors(int server_index, uint_fast64_t max_num_errors)
    {
        m_allow_server_errors[server_index] = uint_least64_t(max_num_errors);
    }

    void set_fake_token_expiration_time(std::int_fast64_t seconds_since_epoch)
    {
        using time_point = sync::Clock::time_point;
        auto duration_1 = std::chrono::seconds(seconds_since_epoch);
        auto duration_2 = std::chrono::duration_cast<time_point::duration>(duration_1);
        auto now_1 = m_fake_token_expiration_clock.now();
        auto now_2 = time_point{duration_2};
        REALM_ASSERT(now_2 >= now_1);
        m_fake_token_expiration_clock.add_time(now_2 - now_1);
    }

    void set_server_connection_reaper_timeout(milliseconds_type timeout)
    {
        for (int i = 0; i < m_num_servers; ++i)
            m_servers[i]->set_connection_reaper_timeout(timeout);
    }

    void close_server_side_connections()
    {
        for (int i = 0; i < m_num_servers; ++i)
            m_servers[i]->close_connections();
    }

    bool wait_for_session_terminations_or_client_stopped()
    {
        for (int i = 0; i < m_num_clients; ++i) {
            if (!m_clients[i]->wait_for_session_terminations_or_client_stopped())
                return false;
        }
        return true;
    }

    std::string map_virtual_to_real_path(int server_index, const std::string& virt_path)
    {
        std::string real_path;
        if (get_server(server_index).map_virtual_to_real_path(virt_path, real_path)) // Throws
            return real_path;
        throw std::runtime_error("Bad virtual path");
    }

    void inform_server_about_external_change(int server_index, const std::string& virt_path)
    {
        get_server(server_index).recognize_external_change(virt_path); // Throws
    }

private:
    using ConnectionStateChangeListener = Session::ConnectionStateChangeListener;
    using port_type = Session::port_type;
    std::shared_ptr<util::Logger> m_logger;
    const int m_num_servers;
    const int m_num_clients;
    const bool m_enable_server_ssl;
    unit_test::TestContext& m_test_context;
    std::vector<std::shared_ptr<util::Logger>> m_server_loggers;
    std::vector<std::shared_ptr<util::Logger>> m_client_loggers;
    std::vector<std::unique_ptr<Server>> m_servers;
    std::vector<std::unique_ptr<Client>> m_clients;
    std::vector<std::function<ConnectionStateChangeListener>> m_connection_state_change_listeners;
    std::vector<port_type> m_server_ports;
    std::vector<ThreadWrapper> m_server_threads;
    std::vector<std::shared_ptr<websocket::DefaultSocketProvider>> m_client_socket_providers;
    std::vector<std::pair<int, int>> m_simulated_server_error_rates;
    std::vector<std::pair<int, int>> m_simulated_client_error_rates;
    std::vector<uint_least64_t> m_allow_server_errors;
    FakeClock m_fake_token_expiration_clock;

    static std::optional<std::array<char, 64>> make_crypt_key(const std::string& key)
    {
        if (!key.empty()) {
            if (key.size() != 64)
                throw std::runtime_error("Encryption key has wrong size");
            std::array<char, 64> key_2;
            std::copy(key.begin(), key.end(), key_2.data());
            return key_2;
        }
        return {};
    }

    void run_server(int i)
    {
        auto do_run_server = [this, i] {
            auto sim = m_simulated_server_error_rates[i];
            if (sim.first != 0) {
                using sf = _impl::SimulatedFailure;
                sf::RandomPrimeGuard pg(sf::sync_server__read_head, sim.first, sim.second,
                                        random_int<uint_fast64_t>()); // Seed from global generator
                m_servers[i]->run();
            }
            else {
                m_servers[i]->run();
            }
            m_servers[i]->stop();
        };
        unit_test::TestContext& test_context = m_test_context;
        if (CHECK_NOTHROW(do_run_server()))
            return;
        stop();
        m_server_loggers[i]->error("Exception was throw from server[%1]'s event loop", i + 1);
    }
};


class ClientServerFixture : public MultiClientServerFixture {
public:
    using Config = MultiClientServerFixture::Config;

    ClientServerFixture(std::string server_dir, unit_test::TestContext& test_context, Config config = {})
        : MultiClientServerFixture{1, 1, std::move(server_dir), test_context, std::move(config)}
    {
    }

    // Set an error handler to be used for all sessions of the client (\a
    // handler will be copied for each session). Must be called before
    // make_session().
    void set_client_side_error_handler(std::function<ErrorHandler> handler)
    {
        MultiClientServerFixture::set_client_side_error_handler(0, std::move(handler));
    }

    // Must be called before start().
    void set_client_side_error_rate(int n, int m)
    {
        MultiClientServerFixture::set_client_side_error_rate(0, n, m);
    }

    // Must be called before start().
    void set_server_side_error_rate(int n, int m)
    {
        MultiClientServerFixture::set_server_side_error_rate(0, n, m);
    }

    Client& get_client() noexcept
    {
        return MultiClientServerFixture::get_client(0);
    }

    Server& get_server() noexcept
    {
        return MultiClientServerFixture::get_server(0);
    }

    Session make_session(DBRef db, std::string realm_identifier, Session::Config&& config = {})
    {
        return MultiClientServerFixture::make_session(0, 0, std::move(db), std::move(realm_identifier),
                                                      std::move(config));
    }
    Session make_session(std::string const& path, std::string realm_identifier, Session::Config&& config = {})
    {
        auto db = DB::create(make_client_replication(), path);
        return MultiClientServerFixture::make_session(0, 0, std::move(db), std::move(realm_identifier),
                                                      std::move(config));
    }

    Session make_bound_session(DBRef db, std::string server_path = "/test", Session::Config&& config = {})
    {
        return MultiClientServerFixture::make_bound_session(0, std::move(db), 0, std::move(server_path),
                                                            std::move(config));
    }

    Session make_bound_session(DBRef db, std::string server_path, std::string signed_user_token,
                               Session::Config&& config = {})
    {
        return MultiClientServerFixture::make_bound_session(0, std::move(db), 0, std::move(server_path),
                                                            std::move(signed_user_token), std::move(config));
    }

    void cancel_reconnect_delay()
    {
        MultiClientServerFixture::cancel_reconnect_delay(0); // Throws
    }

    void allow_server_errors(uint_fast64_t max_num_errors)
    {
        MultiClientServerFixture::allow_server_errors(0, max_num_errors);
    }

    std::string map_virtual_to_real_path(const std::string& virt_path)
    {
        return MultiClientServerFixture::map_virtual_to_real_path(0, virt_path); // Throws
    }

    void inform_server_about_external_change(const std::string& virt_path)
    {
        MultiClientServerFixture::inform_server_about_external_change(0, virt_path); // Throws
    }
};


class RealmFixture {
public:
    using ErrorHandler = MultiClientServerFixture::ErrorHandler;

    struct Config : Session::Config {
        util::UniqueFunction<ErrorHandler> error_handler;
    };

    RealmFixture(ClientServerFixture&, const std::string& real_path, const std::string& virt_path, Config = {});
    RealmFixture(MultiClientServerFixture&, int client_index, int server_index, const std::string& real_path,
                 const std::string& virt_path, Config = {});
    ~RealmFixture() noexcept;

    void empty_transact();
    void nonempty_transact();

    using TransactFunc = util::FunctionRef<bool(Transaction&)>;

    /// Perform a non-serialized transaction synchronously.
    bool transact(TransactFunc);

    bool wait_for_upload_complete_or_client_stopped();
    bool wait_for_download_complete_or_client_stopped();

    using WaitOperCompletionHandler = Session::WaitOperCompletionHandler;

    void async_wait_for_sync_completion(WaitOperCompletionHandler);
    void async_wait_for_upload_completion(WaitOperCompletionHandler);
    void async_wait_for_download_completion(WaitOperCompletionHandler);

private:
    struct SelfRef {
        util::Mutex mutex;
        RealmFixture* ref = nullptr;
        SelfRef(RealmFixture* r)
            : ref{r}
        {
        }
    };

    const std::shared_ptr<SelfRef> m_self_ref;
    DBRef m_db;
    sync::Session m_session;

    Config setup_error_handler(Config&&);
};


inline RealmFixture::RealmFixture(ClientServerFixture& client_server_fixture, const std::string& real_path,
                                  const std::string& virt_path, Config config)
    : m_self_ref{std::make_shared<SelfRef>(this)}                                                            // Throws
    , m_db{DB::create(make_client_replication(), real_path)}                                                 // Throws
    , m_session{client_server_fixture.make_session(m_db, virt_path, setup_error_handler(std::move(config)))} // Throws
{
}


inline RealmFixture::RealmFixture(MultiClientServerFixture& client_server_fixture, int client_index, int server_index,
                                  const std::string& real_path, const std::string& virt_path, Config config)
    : m_self_ref{std::make_shared<SelfRef>(this)}            // Throws
    , m_db{DB::create(make_client_replication(), real_path)} // Throws
    , m_session{client_server_fixture.make_session(client_index, server_index, m_db, virt_path,
                                                   setup_error_handler(std::move(config)))} // Throws
{
}

inline RealmFixture::~RealmFixture() noexcept
{
    util::LockGuard lock{m_self_ref->mutex};
    m_self_ref->ref = nullptr;
}

inline void RealmFixture::empty_transact()
{
    transact([](Transaction&) {
        return true;
    });
}

inline void RealmFixture::nonempty_transact()
{
    auto func = [](Transaction& tr) {
        TableRef table = tr.get_or_add_table_with_primary_key("class_Table", type_Int, "id");
        int id = 1;
        bool did_create = false;
        while (!did_create)
            table->create_object_with_primary_key(id++, &did_create);
        return true;
    };
    transact(func);
}

inline bool RealmFixture::transact(TransactFunc transact_func)
{
    auto tr = m_db->start_write(); // Throws
    if (!transact_func(*tr))       // Throws
        return false;
    version_type new_version = tr->commit();        // Throws
    m_session.nonsync_transact_notify(new_version); // Throws
    return true;
}

inline bool RealmFixture::wait_for_upload_complete_or_client_stopped()
{
    return m_session.wait_for_upload_complete_or_client_stopped();
}

inline bool RealmFixture::wait_for_download_complete_or_client_stopped()
{
    return m_session.wait_for_download_complete_or_client_stopped();
}

inline void RealmFixture::async_wait_for_sync_completion(WaitOperCompletionHandler handler)
{
    m_session.async_wait_for_sync_completion(std::move(handler));
}

inline void RealmFixture::async_wait_for_upload_completion(WaitOperCompletionHandler handler)
{
    m_session.async_wait_for_upload_completion(std::move(handler));
}

inline void RealmFixture::async_wait_for_download_completion(WaitOperCompletionHandler handler)
{
    m_session.async_wait_for_download_completion(std::move(handler));
}

inline RealmFixture::Config RealmFixture::setup_error_handler(Config&& config)
{
    if (config.error_handler) {
        config.connection_state_change_listener =
            [handler = std::move(config.error_handler)](ConnectionState state,
                                                        const std::optional<SessionErrorInfo>& error_info) {
                if (state != ConnectionState::disconnected)
                    return;
                REALM_ASSERT(error_info);
                handler(error_info->status, error_info->is_fatal);
            };
    }
    return std::move(config);
}
} // namespace realm::fixtures
